Java NIO
Table of Contents
1. 基础
根据 UNIX 网络编程对 I/O 模型的分类可以归纳出 5 种模型:
- 阻塞 I/O:
- 用户进程在拿到数据前,一直等待 (占用 CPU 以及硬盘 I/O 通道等)。
- 非阻塞 I/O (数据没准备好可以做其他事情,提升了单个进程的质量):
- 如果 kernel 中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个 error。
- 从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。
- 用户进程收到结果是一个 error 时,知道数据还没有准备好,于是就可以在下次再进行 read 操作之前做其他事情,或者直接再次发送 read 操作。
- 一旦 kernel 中的数据准备好,并再次收到用户进程的 system call,那么它马上将数据拷贝到用户进程的内存 (这一阶段仍然阻塞),然后返回。
- I/O 多路复用 (主要用在网络 I/O 上,牺牲单个 I/O 质量,提升可处理的 I/O 数量):
- 所有用户进程统一把 I/O 请求提交到同一个地方。
- 用户进程提交 I/O 请求后被阻塞。
- 数据准备好后通知 (激活) 用户进程自己取。
- 异步 I/O:
- 所有用户进程统一把 I/O 请求提交到一个地方 (这一步与 I/O 多路复用相同)。
- 当用户进程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内。
- 内核在 I/O 完成后通知用户线程直接使用即可。
- 信号驱动 I/O (不实用,跳过)
JAVA 中的 I/O 可分为两类:
- 普通 JAVA I/O:
- 面向流 Stream Oriented
- 阻塞式 I/O blocking I/O (仅限网络 I/O)
- NIO:
- 面向缓冲区 Buffer Oriented
- 非阻塞式 I/O Non Blocking I/O (仅限网络 I/O)
- 选择器 Selectors (仅限网络 I/O)
简单理解: 普通 IO 使用面向流的处理方式,NIO 使用面向缓冲块的处理方式。
面向流的 I/O 一次一个字节地处理数据。面向缓冲块的 I/O 以缓冲块为单位处理数据。
NIO 主要由三个核心部分组成:
- Buffer 缓冲区
- Channel 数据管道
- Selector 选择器
java 的普通 I/O 已经被 NIO 重写过了,所以可以不必显式地使用 NIO。
2. 文件 I/O
2.1. 缓冲区与数据管道
NIO 使用 Buffer 缓冲区和 Channel 数据管道配合来处理数据,不以流的方式处理数据。
Buffer 中的数据通过 Channel 传输到输出端。Channel 不处理数据,它只负责运输数据。
传统 I/O 的流是单向的,NIO 有 Channel 这个概念,可双向读写。
2.2. Buffer 缓冲区
Buffer
类是缓冲区的抽象类。~ByteBuffer~ 是最常用的实现类,用于读写字节数据。
Buffer
类维护了 4 个核心变量来提供关于数据区的信息:
- 容量
Capacity
- 数据区能够容纳的数据元素的最大数量。容量在缓冲区创建时被设定,并且永远不能被改变 (底层是数组)。
- 上界
Limit
- 数据区里的数据的总数,代表了当前缓冲区中一共有多少数据。
- 位置
Position
- 下一个要被读或写的元素的位置。
Position
会自动由相应的get()
和put()
函数更新。
- 下一个要被读或写的元素的位置。
- 标记
Mark
- 用于记录上一次读写的位置。
代码演示:
// 创建一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 看一下初始时4个核心变量的值
System.out.println("初始时-->limit--->"+byteBuffer.limit());
System.out.println("初始时-->position--->"+byteBuffer.position());
System.out.println("初始时-->capacity--->"+byteBuffer.capacity());
System.out.println("初始时-->mark--->" + byteBuffer.mark());
System.out.println("--------------------------------------");
// 添加一些数据到缓冲区中
String s = "Gridsah";
byteBuffer.put(s.getBytes());
// 看一下初始时4个核心变量的值
System.out.println("put完之后-->limit--->"+byteBuffer.limit());
System.out.println("put完之后-->position--->"+byteBuffer.position());
System.out.println("put完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("put完之后-->mark--->" + byteBuffer.mark());
System.out.println("--------------------------------------");
// flip() 方法
byteBuffer.flip();
System.out.println("flip完之后-->limit--->"+byteBuffer.limit());
System.out.println("flip完之后-->position--->"+byteBuffer.position());
System.out.println("flip完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("flip完之后-->mark--->" + byteBuffer.mark());
System.out.println("--------------------------------------");
// get() 方法
// 创建一个 limit 大小的字节数组
byte[] bytes = new byte[byteBuffer.limit()];
// 将读取的数据装进字节数组中
byteBuffer.get(bytes);
// 输出数据
System.out.println(new String(bytes, 0, bytes.length));
System.out.println("get完之后-->limit--->"+byteBuffer.limit());
System.out.println("get完之后-->position--->"+byteBuffer.position());
System.out.println("get完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("get完之后-->mark--->" + byteBuffer.mark());
输出如下:
初始时-->limit--->1024 初始时-->position--->0 初始时-->capacity--->1024 初始时-->mark--->java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024] -------------------------------------- put完之后-->limit--->1024 put完之后-->position--->7 put完之后-->capacity--->1024 put完之后-->mark--->java.nio.HeapByteBuffer[pos=7 lim=1024 cap=1024] -------------------------------------- flip完之后-->limit--->7 flip完之后-->position--->0 flip完之后-->capacity--->1024 flip完之后-->mark--->java.nio.HeapByteBuffer[pos=0 lim=7 cap=1024] -------------------------------------- Gridsah get完之后-->limit--->7 get完之后-->position--->7 get完之后-->capacity--->1024 get完之后-->mark--->java.nio.HeapByteBuffer[pos=7 lim=7 cap=1024]
NIO 给了一个 flip()
方法从缓存区拿数据: 这个方法改动了 position
和 limit
的位置。
调用完 filp()
后: position
是开始读的位置,而 limit
限制读到哪里。
一般称 filp()
的作用为 切换成读模式 。每当从缓存区读取数据时它就被调用。
切换成读模式之后,可以用 get()
读取数据,读取完之后有 position = limit
。
2.3. FileChannel 通道
Channel
只负责传输数据,所有数据操应作用于 Buffer
。 Channel
接口的主要实现类有:
FileChannel
SocketChannel
ServerSocketChannel
DatagramChannel
获取通道代码演示:
// 1. 通过本地 IO 的方式来获取通道
FileInputStream fileInputStream = new FileInputStream("F:\\JavaEE常用框架\\wtf.md");
// 得到文件的输入通道
FileChannel inChannel = fileInputStream.getChannel();
// 2. jdk1.7 后通过静态方法 open() 获取通道
FileChannel.open(Paths.get("F:\\JavaEE常用框架\\wtf.md"), StandardOpenOption.WRITE);
使用 FileChannel
配合 Buffer
实现文件复制代码演示:
try {
// 创建通道
FileChannel inChannel = new FileInputStream("1.jpg").getChannel();
FileChannel outChannel = new FileInputStream("2.jpg").getChannel();
// 创建缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 将通道中的数据存入缓冲区
while(inChannel.read(buf) != -1){
buf.flip(); // 切换读模式
outChannel.write(buf); // 将缓冲区数据传入通道
buf.clear(); // 清空缓冲区
}
} catch (IOException e) {
e.printStackTrace();
}
使用 内存映射文件 实现文件复制 (直接操作缓冲区) 代码演示:
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
// 映射内存
MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_Write, 0, inChannel.size());
// 直接对缓冲区进行读写
byte[] dst = new Byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
通道间通过 transfer()
实现数据传输 (直接操作缓冲区) 代码演示:
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
inChannel.transferTo(0, inChannel.size(), outChannel);
2.4. 直接与非直接缓存区
将数据读入缓缓存时,缓存区可以有两种情况:
- 非直接缓存区:
- 程序所有读写操作必须经过一个从内核空间 (OS) 到用户空间 (JVM) 的 copy 阶段。
- 内存中有两个相同的缓存区块,一个属于内核空间,一个属于用户空间。
- 优点: 安全。缺点: 速度极慢。
- 直接缓冲区:
- 不需要经过 copy 阶段。
- 内存中只有一个缓存区块,内核 (OS) 与用户空间 (JVM) 共享这个缓存区块。
- 优点: 速度快。缺点: 创建和回收代价高。不安全。大文件操作时 CPU 占用极高。
创建非直接缓存区时,用 allocate()
工厂方法分配空间,将缓冲区建立在 JVM 的内存中。
直接缓存区创建有两种方式:
- 创建缓存时用
allocateDirect()
工厂方法分配空间。 - 在
FileChannel
上调用map()
方法,将文件直接映射到内存中创建。
缓存区可以调用 isDirect()
方法来区分直接与非直接。
2.5. scatter 和 gather 字符集
分散读取 scatter
: 将读取到的数据 依次 填满多个缓存区。
聚集写入 gather
: 将多个缓存区中的数据 按顺序 集中写入到一个通道中。
分散读取与聚集写入代码演示:
FileChannel channelA = new FileInputStream("F:\\wtf.md").getChannel();
FileChannel channelB = new RandomAccessFile("2.txt", "rw").getChannel();
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
ByteBuffer buf3 = ByteBuffer.allocate(100);
ByteBuffer buf4 = ByteBuffer.allocate(1024);
ByteBuffer[] bufsA = {buf1, buf2};
ByteBuffer[] bufsB = {buf3, buf4};
// 分散读取
channelA.read(bufsA);
// 聚集写入
channelB.write(bufsB);
for (ByteBuffer[] byteBuffer : bufs) {
byteBuffer.flip();
}
System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("-----------------")
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()))
2.6. 字符集
重点在于编码格式和解码格式一致:
Charset csA = Charset.forName("GBK");
// 获取编码器
CharsetEncoder ce = csA.newEncoder();
// 获取解码器
CharsetDecoder cd = csA.newDecoder();
// 创建字符缓存区
CharBuffer cBuf1 = CharBuffer.allocate(1024);
//放入字符
cBuf1.put("what?");
cBuf1.flip();
// 对字符编码
ByteBuffer bBuf = ce.encode(cBuf1);
for (int i = 0; bBuf < 5; i++) {
System.out.print("["+bBuf.get() + "]");
}
System.out.print("\n");
// 对字符解码
bBuf.flip();
CharBuffer cBuf2 = cd.decode(bBuf);
for (int i = 0; i < cBuf2.limit(); i++) {
System.out.print("["+cBuf2.get() + "]");
}
输出如下:
[119][104][97][116][63] [w][h][a][t][?]
3. 网络 I/O
NIO (non blocking I/O) 是在网络层理解的,非阻塞的特点也是网络 I/O 中体现。
NIO 在网络 I/O 中采用多路复用的 I/O 模型,对于操作文件的 FileChannel
来说依旧采用阻塞式的 I/O 模型。
SelectableChannel
是网络通信常用的 Channel
接口的实现,它的子类有:
SocketChannel
ServerSocketChannel
DatagramChannel
Pipe.SinkChannel
Pipe.SourceChannel
3.1. 阻塞式网络 I/O
阻塞式 I/O 模型不需要 Selector
选择器参与,代码演示:
// 客户端
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 6666));
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\a.txt"), StandardOpenOption.READ);
// 3. 创建缓存区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 4. 读取本地文件并发送出去
while (fileChannel.read(buffer) != -1) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
// 告诉服务端已经发完数据了
socketChannel.shutdownOoutput();
// 等待服务器的传输完成响应
int len = 0;
while((len = socketChannel.read(buffer)) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
// 5. 关闭通道
fileChannel.close();
socketChannel.close();
// 服务端
// 1. 获取通道
ServerSocketChannel server = ServerSocketChannel.open();
// 2. 创建写入数据的通道
FileChannel outChannel = FileChannel.open(Paths.get("b.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE)
// 3. 绑定 socket 连接
server.bind(new InetSocketAddress("localhost",6666));
// 4. 获取客户端连接
SocketChannel socketChannel = server.accept();
// 5. 创建缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 6. 将客户端传递过来的文件保存到本地
while (client.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
// 服务端收完数据后,通知客户端
buffer.put("success".getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
// 7.关闭通道
outChannel.close();
socketChannel.close();
server.close();
3.2. NIO 非阻塞
使用非阻塞模式可以使客户端不用显式通知服务器数据发送完毕。代码演示:
// 客户端:
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
// 切换成非阻塞模式
socketChannel.configureBlocking(false);
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\1.png"), StandardOpenOption.READ);
// 3. 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4.读取本地文件发送到服务器
while (fileChannel.read(buffer) != -1) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
// 5. 关闭流
fileChannel.close();
socketChannel.close();
// 服务端
// 1.获取通道
ServerSocketChannel server = ServerSocketChannel.open();
// 2.切换成非阻塞模式
server.configureBlocking(false);
// 3. 绑定连接
server.bind(new InetSocketAddress("localhost", 6666));
// 4. 获取选择器
Selector selector = Selector.open();
// 将通道注册到选择器上,指定接收 监听通道 事件
server.register(selector, SelectionKey.OP_ACCEPT);
// 5. 轮询地获取选择器上 已就绪 的事件
// 有事件已就绪就开始处理 select()>0 为已就绪
while (selector.select() > 0) {
// 6. 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 7. 处理所有 已就绪 的事件
while (iterator.hasNext()) {
// 按顺序接收 就绪事件
SelectionKey selectionKey = iterator.next();
// 不同的事件做不同的事
if (selectionKey.isAcceptable()) { // 建立连接 事件就绪
// 获取客户端的链接
SocketChannel socketChannel = server.accept();
// 把连接切换成非阻塞状态
socketChannel.configureBlocking(false);
// 把连接注册到选择器上,监听 读就绪 事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 读 事件就绪
// 获取当前选择器读就绪状态的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建读数据的缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 创建写入数据的通道(写模式、文件不存在则创建)
FileChannel outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
while (socketChannel.read(buffer) > 0) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
// 文件保存完后通知客户端
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("success".getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
// 移除已经处理过的选择键(事件)
iterator.remove();
}
}
但是在非阻塞模式下,客户端要获取服务端的数据,也要在 Selector
上注册,监听读事件。代码演示:
// 客户端:
// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
// 切换成非阻塞模式
socketChannel.configureBlocking(false);
// 2. 创建从本地读取文件的通道
FileChannel fileChannel = FileChannel.open(Paths.get("C:\\1.png"), StandardOpenOption.READ);
// 3. 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4. 获取选择器
Selector selector = Selector.open();
// 5. 将通道注册到选择器中,获取服务端返回的数据
socketChannel.register(selector, SelectionKey.OP_READ);
// 6. 读取本地文件发送到服务器
while (fileChannel.read(buffer) != -1) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
// 7. 轮询地获取选择器上 已就绪 的事件
// 有事件已就绪就开始处理 select()>0 为已就绪
while (selector.select() > 0) {
// 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 处理所有 已就绪 的事件
while (iterator.hasNext()) {
// 按顺序接收 就绪事件
SelectionKey selectionKey = iterator.next();
// 不同的事件做不同的事
if (selectionKey.isReadable()) { // 读事件就绪
// 获取当前选择器读就绪状态的通道
SocketChannel tempSocketChannel = (SocketChannel) selectionKey.channel();
// 创建读数据的缓存区
ByteBuffer tempByteBuffer = ByteBuffer.allocate(1024);
// 知道服务端要返回响应的数据给客户端,客户端在这里接收
int readBytes = tempSocketChannel.read(tempByteBuffer);
if (readBytes > 0) {
tempByteBuffer.flip();
System.out.println(new String(tempByteBuffer.array(), 0, readBytes));
}
}
// 移除已经处理过的选择键(事件)
iterator.remove();
}
}
3.3. UDP 传输 DatagramChannel
代码演示:
// 客户端
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);
while(input.hasNext()){
String str = input.nextLine();
buf.put((new Date().toString() + ":" + str).getBytes());
buf.flip();
dc.send(buf, new InetSocketAddress("127.0.0.1", 8989));
buf.clear();
}
datagramChannel.close();
// 服务端
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress(8989));
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
datagramChannel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
iterator.remove();
}
3.4. 管道
管道是两个线程之间的 单向连接 。
Pipe 有一个 source 通道和一个 sink 通道。数据会被写入 sink 通道,从 source 通道获取。
代码演示:
// 获取管道
Pipe pipe = Pipe.open();
// 将数据写入缓存区并发送
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("one way data transmission.".getBytes());
buf.flip();
sinkChannel.write(buf);
buf.clear();
// 读取缓存区的数据
Pipe.SourceChannel sourceChannel = pipe.source();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();